#!/usr/bin/env python3

# check for the venv
from lib import sanity_check

sanity_check.check_venv(__file__)

import argparse
import contextlib
import glob
import os
import shlex
import subprocess
import sys
import tempfile
from typing import Iterator, List
from unittest import mock

import django
import orjson
import responses
from django.conf import settings
from django.test.utils import get_runner

target_fully_covered = [
    "analytics/lib/*.py",
    "analytics/models.py",
    "analytics/tests/*.py",
    "analytics/views.py",
    # zerver/ and zerver/lib/ are important core files
    "zerver/*.py",
    "zerver/lib/*.py",
    "zerver/lib/*/*.py",
    "zerver/lib/*/*/*.py",
    "zerver/data_import/*.py",
    "zerver/templatetags/*.py",
    "zerver/tornado/*.py",
    # Billing files require 100% test coverage
    "corporate/lib/stripe.py",
    "corporate/views.py",
    # Test files should have 100% coverage; test code that isn't run
    # is likely a bug in the test.
    "zerver/tests/*.py",
    "corporate/tests/*.py",
    # As a project, we require 100% test coverage in the views files.
    "zerver/views/*.py",
    "zproject/backends.py",
    "confirmation/*.py",
    "zerver/webhooks/*/*.py",
    # Once we have a nice negative tests system, we can add these:
    # 'zerver/webhooks/*/*.py',
    # 'zerver/webhooks/*/*/*.py',
    "zerver/worker/*.py",
]

not_yet_fully_covered = [
    # Analytics fixtures library is used to generate test fixtures;
    # isn't properly accounted for in test coverage analysis since it
    # runs before tests.
    "analytics/lib/fixtures.py",
    # We have 100% coverage on the new stuff; need to refactor old stuff.
    "analytics/views.py",
    # Major lib files should have 100% coverage
    "zerver/lib/addressee.py",
    "zerver/lib/markdown/__init__.py",
    "zerver/lib/cache.py",
    "zerver/lib/cache_helpers.py",
    "zerver/lib/i18n.py",
    "zerver/lib/email_notifications.py",
    "zerver/lib/send_email.py",
    "zerver/lib/url_preview/preview.py",
    "zerver/worker/queue_processors.py",
    # Markdown sub-libs should have full coverage too; a lot are really close
    "zerver/lib/markdown/api_arguments_table_generator.py",
    "zerver/lib/markdown/fenced_code.py",
    "zerver/lib/markdown/help_relative_links.py",
    "zerver/lib/markdown/nested_code_blocks.py",
    # Other lib files that ideally would coverage, but aren't sorted
    "zerver/__init__.py",
    "zerver/filters.py",
    "zerver/middleware.py",
    "zerver/lib/bot_lib.py",
    "zerver/lib/camo.py",
    "zerver/lib/debug.py",
    "zerver/lib/digest.py",
    "zerver/lib/error_notify.py",
    "zerver/lib/export.py",
    "zerver/lib/fix_unreads.py",
    "zerver/lib/html_diff.py",
    "zerver/lib/import_realm.py",
    "zerver/lib/logging_util.py",
    "zerver/lib/migrate.py",
    "zerver/lib/outgoing_webhook.py",
    "zerver/lib/profile.py",
    "zerver/lib/queue.py",
    "zerver/lib/sqlalchemy_utils.py",
    "zerver/lib/storage.py",
    "zerver/lib/timeout.py",
    "zerver/lib/unminify.py",
    "zerver/lib/utils.py",
    "zerver/lib/zephyr.py",
    "zerver/templatetags/app_filters.py",
    "zerver/templatetags/minified_js.py",
    # Low priority for coverage
    "zerver/lib/ccache.py",
    "zerver/lib/generate_test_data.py",
    "zerver/lib/server_initialization.py",
    "zerver/lib/test_fixtures.py",
    "zerver/lib/test_runner.py",
    "zerver/lib/test_console_output.py",
    "zerver/openapi/python_examples.py",
    # Tornado should ideally have full coverage, but we're not there.
    "zerver/tornado/autoreload.py",
    "zerver/tornado/descriptors.py",
    "zerver/tornado/django_api.py",
    "zerver/tornado/event_queue.py",
    "zerver/tornado/exceptions.py",
    "zerver/tornado/handlers.py",
    "zerver/tornado/ioloop_logging.py",
    "zerver/tornado/sharding.py",
    "zerver/tornado/views.py",
    # Data import files; relatively low priority
    "zerver/data_import/sequencer.py",
    "zerver/data_import/slack.py",
    "zerver/data_import/gitter.py",
    "zerver/data_import/import_util.py",
    # Webhook integrations with incomplete coverage
    "zerver/webhooks/greenhouse/view.py",
    "zerver/webhooks/jira/view.py",
    "zerver/webhooks/librato/view.py",
    "zerver/webhooks/pivotal/view.py",
    "zerver/webhooks/semaphore/view.py",
    "zerver/webhooks/solano/view.py",
    "zerver/webhooks/teamcity/view.py",
    "zerver/webhooks/travis/view.py",
    "zerver/webhooks/zapier/view.py",
]

enforce_fully_covered = sorted(
    {path for target in target_fully_covered for path in glob.glob(target)}
    - {path for target in not_yet_fully_covered for path in glob.glob(target)}
)

FAILED_TEST_PATH = "var/last_test_failure.json"


def get_failed_tests() -> List[str]:
    try:
        with open(FAILED_TEST_PATH, "rb") as f:
            return orjson.loads(f.read())
    except OSError:
        print("var/last_test_failure.json doesn't exist; running all tests.")
        return []


def write_failed_tests(failed_tests: List[str]) -> None:
    if failed_tests:
        with open(FAILED_TEST_PATH, "wb") as f:
            f.write(orjson.dumps(failed_tests))


@contextlib.contextmanager
def block_internet() -> Iterator[None]:
    # Monkey-patching - responses library raises requests.ConnectionError when access to an unregistered URL
    # is attempted. We want to replace that with our own exception, so that it propagates all the way:
    with mock.patch.object(responses, "ConnectionError", new=ZulipInternetBlockedError):
        # We'll run all tests in this context manager. It'll cause an error to be raised (see above comment),
        # if any code attempts to access the internet.
        with responses.RequestsMock():
            yield


class ZulipInternetBlockedError(Exception):
    def __init__(self, original_msg: str) -> None:
        zulip_msg = (
            "Outgoing network requests are not allowed in the Zulip tests. "
            "More details and advice are available here:"
            "https://zulip.readthedocs.io/en/latest/testing/testing.html#internet-access-inside-test-suites"
        )
        msg = f"{zulip_msg}\nResponses library error message: {original_msg}"
        super().__init__(msg)


def main() -> None:
    TOOLS_DIR = os.path.dirname(os.path.abspath(__file__))
    os.chdir(os.path.dirname(TOOLS_DIR))
    sys.path.insert(0, os.path.dirname(TOOLS_DIR))

    default_parallel = os.cpu_count()

    # Remove proxy settings for running backend tests
    os.environ.pop("http_proxy", "")
    os.environ.pop("https_proxy", "")

    from tools.lib.test_script import (
        add_provision_check_override_param,
        assert_provisioning_status_ok,
    )
    from zerver.lib.test_fixtures import (
        remove_test_run_directories,
        update_test_databases_if_required,
    )

    os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.test_settings"
    # "-u" uses unbuffered IO, which is important when wrapping it in subprocess
    os.environ["PYTHONUNBUFFERED"] = "y"

    usage = """test-backend [options]
    test-backend # Runs all backend tests
    test-backend zerver.tests.test_markdown # run all tests in a test module
    test-backend zerver/tests/test_markdown.py # run all tests in a test module
    test-backend test_markdown # run all tests in a test module
    test-backend zerver.tests.test_markdown.MarkdownTest # run all tests in a test class
    test-backend MarkdownTest # run all tests in a test class
    test-backend zerver.tests.test_markdown.MarkdownTest.test_inline_youtube # run a single test
    test-backend MarkdownTest.test_inline_youtube # run a single test"""

    parser = argparse.ArgumentParser(
        description=usage, formatter_class=argparse.RawTextHelpFormatter
    )

    parser.add_argument(
        "--nonfatal-errors",
        action="store_false",
        dest="fatal_errors",
        help="Continue past test failures to run all tests",
    )
    parser.add_argument("--coverage", action="store_true", help="Compute test coverage.")
    parser.add_argument(
        "--verbose-coverage", action="store_true", help="Enable verbose print of coverage report."
    )
    parser.add_argument(
        "--no-cov-cleanup", action="store_true", help="Do not clean generated coverage files."
    )

    parser.add_argument(
        "--parallel",
        dest="processes",
        type=int,
        default=default_parallel,
        help="Specify the number of processes to run the "
        "tests in. Default is the number of logical CPUs",
    )
    parser.add_argument("--profile", action="store_true", help="Profile test runtime.")
    add_provision_check_override_param(parser)
    parser.add_argument(
        "--no-shallow",
        action="store_true",
        help="Don't allow shallow testing of templates (deprecated)",
    )
    parser.add_argument("--verbose", action="store_true", help="Show detailed output")
    parser.add_argument("--reverse", action="store_true", help="Run tests in reverse order.")
    parser.add_argument(
        "--rerun",
        action="store_true",
        help=(
            "Run the tests which failed the last time "
            "test-backend was run.  Implies --nonfatal-errors."
        ),
    )
    parser.add_argument(
        "--include-webhooks",
        action="store_true",
        help=("Include webhook tests.  By default, they are skipped for performance."),
    )
    parser.add_argument(
        "--generate-stripe-fixtures",
        action="store_true",
        help=("Generate Stripe test fixtures by making requests to Stripe test network"),
    )
    parser.add_argument("args", nargs="*")
    parser.add_argument(
        "--ban-console-output",
        action="store_true",
        help="Require stdout and stderr to be clean of unexpected output.",
    )

    options = parser.parse_args()
    if options.ban_console_output:
        os.environ["BAN_CONSOLE_OUTPUT"] = "1"

    args = options.args
    parallel = options.processes
    include_webhooks = options.coverage or options.include_webhooks

    if parallel < 1:
        raise argparse.ArgumentTypeError("option processes: Only positive integers are allowed.")

    zerver_test_dir = "zerver/tests/"

    # While running --rerun, we read var/last_test_failure.json to get
    # the list of tests that failed on the last run, and then pretend
    # those tests were passed explicitly.  --rerun implies
    # --nonfatal-errors, so that we don't end up removing tests from
    # the list that weren't run.
    if options.rerun:
        parallel = 1
        options.fatal_errors = False
        failed_tests = get_failed_tests()
        if failed_tests:
            args = failed_tests
    if len(args) > 0:
        # If we passed a specific set of tests, run in serial mode.
        parallel = 1

        # to transform forward slashes '/' present in the argument into dots '.'
        for i, suite in enumerate(args):
            args[i] = suite.rstrip("/").replace("/", ".")

        def rewrite_arguments(search_key: str) -> None:
            for root, dirs, files_names in os.walk(zerver_test_dir, topdown=False):
                for file_name in files_names:
                    # Check for files starting with alphanumeric characters and ending with '.py'
                    # Ignore backup files if any
                    if not file_name[0].isalnum() or not file_name.endswith(".py"):
                        continue
                    filepath = os.path.join(root, file_name)
                    for line in open(filepath):
                        if search_key not in line:
                            continue
                        new_suite = filepath.replace(".py", ".") + suite
                        args[i] = new_suite
                        return

        for suite in args:
            if suite[0].isupper() and "test_" in suite:
                classname = suite.rsplit(".", 1)[0]
                rewrite_arguments(classname)
            elif suite[0].isupper():
                rewrite_arguments(f"class {suite}(")

        for i, suite in enumerate(args):
            if suite.startswith("test"):
                for root, dirs, files_names in os.walk(zerver_test_dir):
                    for file_name in files_names:
                        if file_name == suite or file_name == suite + ".py":
                            new_suite = os.path.join(root, file_name)
                            args[i] = new_suite
                            break

        for i, suite in enumerate(args):
            args[i] = suite.replace(".py", "")

        # to transform forward slashes '/' introduced by the zerver_test_dir into dots '.'
        # taking care of any forward slashes that might be present
        for i, suite in enumerate(args):
            args[i] = suite.replace("/", ".")

    full_suite = len(args) == 0

    if full_suite:
        suites = [
            "zerver.tests",
            "analytics.tests",
            "corporate.tests",
        ]
    else:
        suites = args

    if full_suite and include_webhooks:
        suites.append("zerver.webhooks")

    if options.generate_stripe_fixtures:
        if full_suite:
            suites = [
                "corporate.tests.test_stripe",
            ]
            full_suite = False
        os.environ["GENERATE_STRIPE_FIXTURES"] = "1"

    assert_provisioning_status_ok(options.skip_provision_check)

    if options.coverage:
        import coverage

        cov = coverage.Coverage(
            data_suffix="", config_file="tools/coveragerc", concurrency="multiprocessing"
        )
        # Do not clean .coverage file in continuous integration job so that coverage data can be uploaded.
        if not options.no_cov_cleanup:
            import atexit

            atexit.register(lambda: cov.erase())  # Ensure the data file gets cleaned up at the end.
        cov.start()
    if options.profile:
        import cProfile

        prof = cProfile.Profile()
        prof.enable()

    # This is kind of hacky, but it's the most reliable way
    # to make sure instrumentation decorators know the
    # setting when they run.
    os.environ["TEST_INSTRUMENT_URL_COVERAGE"] = "TRUE"

    # setup() needs to be called after coverage is started to get proper coverage reports of model
    # files, since part of setup is importing the models for all applications in INSTALLED_APPS.
    django.setup()

    update_test_databases_if_required()

    subprocess.check_call(["tools/webpack", "--test"])

    TestRunner = get_runner(settings)

    if parallel > 1:
        print(f"-- Running tests in parallel mode with {parallel} processes.", flush=True)
    else:
        print("-- Running tests in serial mode.", flush=True)

    with block_internet():
        test_runner = TestRunner(
            failfast=options.fatal_errors,
            verbosity=2,
            parallel=parallel,
            reverse=options.reverse,
            keepdb=True,
        )
        failures, failed_tests = test_runner.run_tests(
            suites, full_suite=full_suite, include_webhooks=include_webhooks
        )
    write_failed_tests(failed_tests)

    templates_not_rendered = test_runner.get_shallow_tested_templates()
    # We only check the templates if all the tests ran and passed
    if not failures and full_suite and templates_not_rendered:
        missed_count = len(templates_not_rendered)
        print(f"\nError: {missed_count} templates have no tests!")
        for template in templates_not_rendered:
            print(f"  {template}")
        print("See zerver/tests/test_templates.py for the exclude list.")
        failures = True

    if options.coverage:
        cov.stop()
        cov.save()
        cov.combine()
        cov.save()
        if options.verbose_coverage:
            print("Printing coverage data")
            cov.report(show_missing=False)
        cov.html_report(directory="var/coverage")
        print("HTML report saved; visit at http://127.0.0.1:9991/coverage/index.html")
    if full_suite and not failures and options.coverage:
        # Assert that various files have full coverage
        for path in enforce_fully_covered:
            missing_lines = cov.analysis2(path)[3]
            if len(missing_lines) > 0:
                print(f"ERROR: {path} no longer has complete backend test coverage")
                print(f"  Lines missing coverage: {missing_lines}")
                print()
                failures = True
        if failures:
            print("It looks like your changes lost 100% test coverage in one or more files")
            print("Usually, the right fix for this is to add some tests.")
            print("But also check out the include/exclude lists in tools/test-backend.")
            print("If this line intentionally is not tested, you can use a # nocoverage comment.")
            print("To run this check locally, use `test-backend --coverage`.")
        ok = True
        for path in not_yet_fully_covered:
            try:
                missing_lines = cov.analysis2(path)[3]
                if len(missing_lines) == 0 and path != "zerver/lib/migrate.py":
                    print(
                        f"ERROR: {path} has complete backend test coverage but is still in not_yet_fully_covered."
                    )
                    ok = False
            except coverage.misc.NoSource:
                continue
        if not ok:
            print()
            print(
                "There are one or more fully covered files that are still in not_yet_fully_covered."
            )
            print("Remove the file(s) from not_yet_fully_covered in `tools/test-backend`.")
            failures = True
    if options.profile:
        prof.disable()
        with tempfile.NamedTemporaryFile(prefix="profile.data.", delete=False) as stats_file:
            prof.dump_stats(stats_file.name)
            print(f"Profile data saved to {stats_file.name}")
            print(f"You can visualize it using e.g. `snakeviz {shlex.quote(stats_file.name)}`")
            print("Note: If you are using vagrant for development environment you will need to do:")
            print("1.) `vagrant ssh -- -L 8080:127.0.0.1:8080`")
            print(f"2.) `snakeviz -s {shlex.quote(stats_file.name)}`")

    # Ideally, we'd check for any leaked test databases here;
    # but that needs some hackery with database names.
    #
    # destroy_leaked_test_databases()

    removed = remove_test_run_directories()
    if removed:
        print(f"Removed {removed} stale test run directories!")

    # We'll have printed whether tests passed or failed above
    sys.exit(bool(failures))


if __name__ == "__main__":
    main()
